package cn.ccccltd.waf.message.util;

import java.util.Map;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ScheduledMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.util.CollectionUtils;

import com.alibaba.fastjson.JSONObject;

import cn.ccccltd.waf.message.constant.MessageConstants;
import cn.ccccltd.waf.message.context.SpringContext;
import cn.ccccltd.waf.message.model.MessageCommon;
import cn.ccccltd.waf.message.queue.log.constant.LogConstants;

/**
 * 创建日期:2017年11月2日
 * Title: 公用的发送消息的方法
 * Description：对本文件的详细描述，原则上不能少于50字
 * @author yangjingjiang
 * @mender：（文件的修改者，文件创建者之外的人）
 * @version 1.0
 * Remark：认为有必要的其他信息
 */
public class QueueUtils{
	
	private static JmsTemplate scheduledJmsTemplate;
	
	private static Connection connection;

	/**
	 * @return 返回 scheduledJmsTemplate。
	
	 */
	public static JmsTemplate getScheduledJmsTemplate() {
		
		if (null == scheduledJmsTemplate) {
			scheduledJmsTemplate = SpringContext.getBean("scheduledJmsTemplate");
		}
		
		return scheduledJmsTemplate;
	}
	
	/**
	 * @return 返回 connection。
	
	 */
	private static  Connection getConnection() {
		
		if (null == connection) {
			connection = SpringContext.getBean("getConnection");
		}
		
		return connection;
	}


	/**
	 * 功能: 发送消息到更新状态队列(含有发送属性及发送重要程度)<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月3日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param jmsTemplate
	 * @param msg
	 * @param statue
	 */
	public static<T extends MessageCommon> void sendMessageToSmsUpdateTableQueue(JmsTemplate jmsTemplate, JSONObject msg ,String statue) {
		
		jmsTemplate.send(new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				
				TextMessage message = session.createTextMessage(JsonUtils.toJson(msg));
				
				//设置优先队列
				message.setJMSPriority(msg.getInteger("msgScope"));
				
				message.setStringProperty(MessageConstants.SEND_STATUE_PROPERTY, statue); 
				
				return message;
			}
		});
		
	}
	
	
	/**
	 * 功能: 发送消息到消息队列(common，含有消息优先级)<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月3日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param jmsTemplate
	 * @param msg
	 * @param params
	 */
	@SafeVarargs
	public static void sendMessageToQueue(JmsTemplate jmsTemplate, JSONObject msg, Map<String,String>... params) {

		jmsTemplate.send(new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				
				TextMessage message = session.createTextMessage(msg.toJSONString());
				//设置消息重要程度
				message.setJMSPriority(msg.getInteger("msgScope"));
				//进行分组
				message.setStringProperty("JMSXGroupID", msg.getString(MessageConstants.ID));

				boolean isLog = false;
				
				for (Map<String, String> param : params) {
					if(null != param.get(LogConstants.LOG_MESSAGE_ID)) {
						isLog = true;
					}
					if (!CollectionUtils.isEmpty(param)) {
						for (Map.Entry<String, String> entrySet : param.entrySet()) {
							
							message.setStringProperty(entrySet.getKey(), entrySet.getValue()); 
							
						}
					}
				}
				
				if (StringUtils.isNotBlank(msg.getString(MessageConstants.SENDTIME)) && !isLog) {
					//延时消息时间发送
					long lo = DateUtilsNew.parseDate(msg.getString(MessageConstants.SENDTIME)).getTime()-DateUtilsNew.getNow().getTime();
					
					if(lo>0) {
						
						message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, lo);
					}
					
				}
				
				return message;
			}
		});
		
	}

	/**
	 * 功能: 删除定时scheduler任务的接口<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月6日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param jmsXGroupID
	 */
	public static void deleteSchedulerJob(String jmsXGroupID) {

		try {
			//设置消息的确认模式
			Session session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);

			// Create the Browse Destination and the Reply To location
			Destination management = session.createQueue(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
			Destination browseDest = session.createTemporaryQueue();

			// Create the eventual Consumer to receive the scheduled message
			MessageConsumer consumer = session.createConsumer(browseDest , "JMSXGroupID = '"+jmsXGroupID+"'");
			MessageProducer producer = session.createProducer(management);

			consumer.setMessageListener(new MessageListener() {
			    @Override
			    public void onMessage(Message message) {
						try {
							TextMessage t = ((TextMessage)message);
							Message request = session.createMessage();
							request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
							request.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, message.getStringProperty("scheduledJobId"));
							producer.send(request);
							//更新消息状态
							JSONObject json = JsonUtils.toObject(t.getText(), JSONObject.class);
							//设置取消状态
							json.put("sendState", MessageConstants.SEND_FAIL_CANCEL);
							
							sendMessageToQueue(getScheduledJmsTemplate(), json);
							
						} catch (JMSException e) {
							e.printStackTrace();
						}
			    }
			});

			// Send the browse request
			Message request = session.createMessage();
			request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
			request.setJMSReplyTo(browseDest);
			producer.send(request);
		} catch (JMSException e) {
			e.printStackTrace();
		}
    
	}
	
}
